# Part of Odoo. See LICENSE file for full copyright and licensing details.

import logging
import requests
from lxml import etree
from markupsafe import Markup
from hashlib import md5
from urllib import parse

from odoo import api, fields, models
from odoo.addons.account_peppol.tools.demo_utils import handle_demo
from odoo.addons.account.models.company import PEPPOL_LIST

TIMEOUT = 10
_logger = logging.getLogger(__name__)



class ResPartner(models.Model):
    _inherit = 'res.partner'

    invoice_sending_method = fields.Selection(
        selection_add=[('peppol', 'by Peppol')],
    )
    peppol_eas = fields.Selection(selection_add=[('odemo', 'Odoo Demo ID')])  # Not a real EAS, used for demonstration.
    available_peppol_sending_methods = fields.Json(compute='_compute_available_peppol_sending_methods')
    available_peppol_edi_formats = fields.Json(compute='_compute_available_peppol_edi_formats')
    peppol_verification_state = fields.Selection(
        selection=[
            ('not_verified', 'Unchecked'),
            ('not_valid', 'Partner is not on Peppol'),  # does not exist on Peppol at all
            ('not_valid_format', 'Partner cannot receive format'),  # registered on Peppol but cannot receive the selected document type
            ('valid', 'Partner is on Peppol'),
        ],
        string='Peppol status',
        company_dependent=True,
    )

    @api.onchange('invoice_edi_format', 'peppol_endpoint', 'peppol_eas')
    def _onchange_verify_peppol_status(self):
        self.button_account_peppol_check_partner_endpoint()

    # -------------------------------------------------------------------------
    # COMPUTE METHODS
    # -------------------------------------------------------------------------

    @api.depends_context('company')
    @api.depends('company_id')
    def _compute_available_peppol_sending_methods(self):
        methods = dict(self._fields['invoice_sending_method'].selection)
        if self.env.company.country_code not in PEPPOL_LIST:
            methods.pop('peppol')
        self.available_peppol_sending_methods = list(methods)

    @api.depends_context('company')
    @api.depends('invoice_sending_method')
    def _compute_available_peppol_edi_formats(self):
        for partner in self:
            if partner.invoice_sending_method == 'peppol':
                partner.available_peppol_edi_formats = self._get_peppol_formats()
            else:
                partner.available_peppol_edi_formats = list(dict(self._fields['invoice_edi_format'].selection))

    def _compute_available_peppol_eas(self):
        # EXTENDS 'account_edi_ubl_cii'
        super()._compute_available_peppol_eas()
        eas_codes = set(self[:1].available_peppol_eas)
        if self.env.company._get_peppol_edi_mode() != 'demo' and 'odemo' in eas_codes:
            eas_codes.remove('odemo')
            self.available_peppol_eas = list(eas_codes)

    # -------------------------------------------------------------------------
    # HELPERS
    # -------------------------------------------------------------------------

    def _log_verification_state_update(self, company, old_value, new_value):
        # log the update of the peppol verification state
        # we do this instead of regular tracking because of the customized message
        # and because we want to log the change for every company in the db
        if old_value == new_value:
            return

        peppol_verification_state_field = self._fields['peppol_verification_state']
        selection_values = dict(peppol_verification_state_field.selection)
        old_label = selection_values[old_value] if old_value else False  # get translated labels
        new_label = selection_values[new_value] if new_value else False

        body = Markup("""
            <ul>
                <li>
                    <span class='o-mail-Message-trackingOld me-1 px-1 text-muted fw-bold'>{old}</span>
                    <i class='o-mail-Message-trackingSeparator fa fa-long-arrow-right mx-1 text-600'/>
                    <span class='o-mail-Message-trackingNew me-1 fw-bold text-info'>{new}</span>
                    <span class='o-mail-Message-trackingField ms-1 fst-italic text-muted'>({field})</span>
                    <span class='o-mail-Message-trackingCompany ms-1 fst-italic text-muted'>({company})</span>
                </li>
            </ul>
        """).format(
            old=old_label,
            new=new_label,
            field=peppol_verification_state_field.string,
            company=company.display_name,
        )
        self._message_log(body=body)

    @api.model
    def _get_participant_info(self, edi_identification):
        # DEPRECATED: Peppol moved from CNAME to NAPTR DNS records
        hash_participant = md5(edi_identification.lower().encode()).hexdigest()
        endpoint_participant = parse.quote_plus(f"iso6523-actorid-upis::{edi_identification}")
        edi_mode = self.env.company._get_peppol_edi_mode()
        sml_zone = 'acc.edelivery' if edi_mode == 'test' else 'edelivery'
        smp_url = f"http://B-{hash_participant}.iso6523-actorid-upis.{sml_zone}.tech.ec.europa.eu/{endpoint_participant}"

        try:
            response = requests.get(smp_url, timeout=TIMEOUT)
            response.raise_for_status()
        except requests.exceptions.RequestException as e:
            _logger.debug(e)
            return None
        return etree.fromstring(response.content)

    @api.model
    @handle_demo
    def _check_peppol_participant_exists(self, participant_info, edi_identification):
        service_href = ''
        if isinstance(participant_info, dict):
            participant_identifier = participant_info.get('identifier', '')
            if services := participant_info.get('services', []):
                service_href = services[0].get('href', '')
        else:
            # DEPRECATED: we now use Odoo peppol API to fetch participant info and get a json response
            # keeping this branch for compatibility
            participant_identifier = participant_info.findtext('{*}ParticipantIdentifier') or ''
            service_metadata = participant_info.find('.//{*}ServiceMetadataReference')
            if service_metadata is not None:
                service_href = service_metadata.attrib.get('href', '')

        # all Belgian companies are pre-registered on hermes-belgium, so they will
        # technically have an existing SMP url but they are not real Peppol participants
        # NOTE: peppol identifier must be case insensitive
        return edi_identification.lower() == participant_identifier.lower() and 'hermes-belgium' not in service_href

    @api.model
    def _peppol_lookup_participant(self, edi_identification):
        """NAPTR DNS peppol participant lookup through Odoo's Peppol proxy"""
        if (edi_mode := self.env.company._get_peppol_edi_mode()) == 'demo':
            return

        origin = self.env['account_edi_proxy_client.user']._get_proxy_urls()['peppol'][edi_mode]
        query = parse.urlencode({'peppol_identifier': edi_identification.lower()})
        endpoint = f'{origin}/api/peppol/1/lookup?{query}'

        try:
            response = requests.get(endpoint, timeout=TIMEOUT)
        except requests.exceptions.RequestException as e:
            _logger.debug("failed to query peppol participant %s: %s", edi_identification, e)
            return

        try:
            decoded_response = response.json()
        except ValueError:
            _logger.error('invalid JSON response %s when querying peppol participant %s', response.status_code, edi_identification)
            return

        if error := decoded_response.get('error'):
            if error.get('code') != 'NOT_FOUND':
                _logger.error('error when querying peppol participant %s: %s', edi_identification, error.get('message', 'unknown error'))
            return

        if not response.ok:
            _logger.error('unsuccessful response %s when querying peppol participant %s', response.status_code, edi_identification)
            return

        return decoded_response.get('result')

    def _check_document_type_support(self, participant_info, ubl_cii_format, process_type='billing'):
        edi_builder = self._get_edi_builder(ubl_cii_format)
        expected_customization_id = edi_builder._get_customization_id(process_type=process_type)
        if isinstance(participant_info, dict):
            return any(expected_customization_id in (service.get('document_id') or '') for service in participant_info.get('services', []))

        # DEPRECATED: participant_info as XML fetched directly from SMP
        service_references = participant_info.findall(
            '{*}ServiceMetadataReferenceCollection/{*}ServiceMetadataReference'
        )
        for service in service_references:
            if expected_customization_id in parse.unquote_plus(service.attrib.get('href', '')):
                return True
        return False

    def _update_peppol_state_per_company(self, vals=None):
        partners = self.env['res.partner']
        if vals is None:
            partners = self.filtered(lambda p: all([p.peppol_eas, p.peppol_endpoint, p.is_ubl_format, p.country_code in PEPPOL_LIST]))
        elif {'peppol_eas', 'peppol_endpoint', 'invoice_edi_format'}.intersection(vals.keys()):
            partners = self.filtered(lambda p: p.country_code in PEPPOL_LIST)

        all_companies = None
        for partner in partners.sudo():
            if partner.company_id:
                partner.button_account_peppol_check_partner_endpoint(company=partner.company_id)
                continue

            if all_companies is None:
                # We only check it for companies that are actually using Peppol.
                can_send = self.env['account_edi_proxy_client.user']._get_can_send_domain()
                all_companies = self.env['res.company'].sudo().search([
                    ('account_peppol_proxy_state', 'in', can_send),
                ])

            for company in all_companies:
                partner.button_account_peppol_check_partner_endpoint(company=company)

    # -------------------------------------------------------------------------
    # LOW-LEVEL METHODS
    # -------------------------------------------------------------------------

    @api.model_create_multi
    def create(self, vals_list):
        res = super().create(vals_list)
        if res:
            res._update_peppol_state_per_company()
        return res

    # -------------------------------------------------------------------------
    # BUSINESS ACTIONS
    # -------------------------------------------------------------------------

    def button_account_peppol_check_partner_endpoint(self, company=None):
        """ A basic check for whether a participant is reachable at the given
        Peppol participant ID - peppol_eas:peppol_endpoint (ex: '9999:test')
        The SML (Service Metadata Locator) assigns a DNS name to each peppol participant.
        This DNS name resolves into the SMP (Service Metadata Publisher) of the participant.
        The DNS address is of the following form:
        strip-trailing(base32(sha256(lowercase(ID-VALUE))),"=") + "." + ID-SCHEME + "." + SML-ZONE-NAME
        The lookup should be done on NAPTR DNS from 2025-11-01
        (ref:https://peppol.helger.com/public/locale-en_US/menuitem-docs-doc-exchange)
        """
        self.ensure_one()
        if not company:
            company = self.env.company

        self_partner = self.with_company(company)
        if not self_partner.peppol_eas or not self_partner.peppol_endpoint:
            return False
        old_value = self_partner.peppol_verification_state
        new_value = self._get_peppol_verification_state(
            self_partner.peppol_endpoint,
            self_partner.peppol_eas,
            self_partner._get_peppol_edi_format(),
        )

        if (
                new_value != 'valid'
                and self_partner.peppol_eas in ('0208', '9925')
        ):
            # checks the inverse `eas:endpoint` if the belgian user was not found on Peppol in the first try
            inverse_eas = '9925' if self_partner.peppol_eas == '0208' else '0208'
            inverse_endpoint = f'BE{self_partner.peppol_endpoint}' if self_partner.peppol_eas == '0208' else self_partner.peppol_endpoint[2:]
            if (peppol_state := self._get_peppol_verification_state(inverse_endpoint, inverse_eas, self_partner._get_peppol_edi_format())) == 'valid':
                self_partner.write({
                    'peppol_eas': inverse_eas,
                    'peppol_endpoint': inverse_endpoint,
                })
                new_value = peppol_state

        if old_value != new_value:
            self_partner.peppol_verification_state = new_value
            self._log_verification_state_update(company, old_value, self_partner.peppol_verification_state)
        return False

    @api.model
    @handle_demo
    def _get_peppol_verification_state(self, peppol_endpoint, peppol_eas, invoice_edi_format, process_type='billing'):
        if not (peppol_eas and peppol_endpoint) or invoice_edi_format not in self._get_peppol_formats():
            return 'not_verified'

        edi_identification = f"{peppol_eas}:{peppol_endpoint}".lower()
        participant_info = self._peppol_lookup_participant(edi_identification)
        if participant_info is None:
            return 'not_valid'
        else:
            is_participant_on_network = self._check_peppol_participant_exists(participant_info, edi_identification)
            if is_participant_on_network:
                is_valid_format = self._check_document_type_support(participant_info, invoice_edi_format, process_type=process_type)
                if is_valid_format:
                    return 'valid'
                else:
                    return 'not_valid_format'
            else:
                return 'not_valid'

    def _get_frontend_writable_fields(self):
        frontend_writable_fields = super()._get_frontend_writable_fields()
        frontend_writable_fields.update({'peppol_eas', 'peppol_endpoint'})

        return frontend_writable_fields
